package com.lambkit.core.flow;

import com.lambkit.core.AppContext;
import com.lambkit.core.CronExecutor;
import com.lambkit.core.LifecycleException;
import com.lambkit.core.LifecycleState;
import com.lambkit.util.Printer;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

/**
 * @author yangyong(孤竹行)
 */
public class FlowInstance implements IFlowInstance {

    private AppContext appContext;

    private FlowContext context;

    /**
     * 待执行的节点
     */
    private ConcurrentHashMap<String, IFlowNode> nodeMap = new ConcurrentHashMap<String, IFlowNode>();

    private final ThreadPoolExecutor jobExecutor;
    private final CronExecutor cronExecutor;

    private long timeout = 1000 * 60 * 5;//5分钟

    /**
     * 最多等待任务数
     */
    private int maxNodeSize = 1000;

    public FlowInstance(AppContext appContext, FlowContext context) {
        this.appContext = appContext;
        this.context = context;

        ArrayBlockingQueue<Runnable> threadWorkQueue = new ArrayBlockingQueue<Runnable>(10000);
        jobExecutor = new ThreadPoolExecutor(5, 30, 30 * 1000, TimeUnit.MILLISECONDS, threadWorkQueue);
        cronExecutor = new CronExecutor(100, false) {
            @Override
            public void execute() {
                Iterator<Map.Entry<String, IFlowNode>> iterator = nodeMap.entrySet().iterator();
                while (iterator.hasNext()) {
                    //限流
                    if(jobExecutor.getQueue().size() > maxNodeSize) {
                        break;
                    }
                    Map.Entry<String, IFlowNode> entry = iterator.next();
                    IFlowNode flowNode = entry.getValue();
                    if (flowNode == null) {
                        break;
                    }
                    if(flowNode.getCurrentState().equals(LifecycleState.NEW)
                        || flowNode.getCurrentState().equals(LifecycleState.STARTING)) {
                        //iterator.remove();
                        startFlowNode(flowNode);
                    }
                }//while
            }//execute
        };
    }

    private void startFlowNode(IFlowNode flowNode) {
        jobExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    flowNode.start();
                } catch (LifecycleException e) {
                    throw new RuntimeException(e);
                }
            }//run
        });
    }

    @Override
    public void add(IFlowNode flowNode) {
        addNodeToQueue(flowNode);
        openCronSwitch();
    }

    private void addNodeToQueue(IFlowNode flowNode) {
        String nodeid = flowNode.getId();
        if(!nodeMap.containsKey(nodeid)) {
            nodeMap.put(nodeid, flowNode);
            Printer.print(this, "flow", "add node to queue:" + nodeid);
        }
    }

    @Override
    public void start() throws LifecycleException {
        cronExecutor.start();
        context.setCurrentState(LifecycleState.STARTING);
        List<FlowNodeObject> flowNodeList = context.getInstanceObject().getCurrentNodes();
        if(flowNodeList != null) {
            for(FlowNodeObject nodeObject : flowNodeList) {
                IFlowNode flowNode = this.appContext.getFlowWorkCenter().getFlowNodeFactory().get(nodeObject, this);
                flowNode.start();
            }
        }
        if(context.isReady()) {
            cronExecutor.setHeartbeatOpen(true);
        }
        context.setCurrentState(LifecycleState.STARTED);
    }

    public void openCronSwitch() {
        if(context.isReady()) {
            cronExecutor.setHeartbeatOpen(true);
        }
    }

    @Override
    public void stop() throws LifecycleException {
        cronExecutor.close();
        jobExecutor.shutdown();
        context.setCurrentState(LifecycleState.STOPPED);
    }

    @Override
    public LifecycleState getCurrentState() {
        return context.getCurrentState();
    }

    @Override
    public FlowContext getFlowContext() {
        return context;
    }

    public AppContext getAppContext() {
        return appContext;
    }
}
